Total Points: 5
Instructions:
# YOUR CODE HERE or # YOUR ANALYSIS HERE.In this assignment you will use Dynamic Time Warping (DTW) to compare the similarity of different audio files. The audio folder included with this homework contains:
audio1a.wav and audio1b.wav - Two versions of the same song (instrumental), both recorded in a professional recording studio.
audio2a.wav, audio2b.wav, and audio2c.wav - Three versions of the same song with vocals, recorded in different conditions.
Grading: Each part is worth 1 point.
This assignment uses dtw-python which can be installed with the following command:
pip install dtw-python
Important: Use the dtw-python package and not the dtw package. If you installed dtw by mistake, uninstall it and re-install dtw-python, as there may be compatibility issues.
import numpy as np
import librosa
import IPython.display as ipd
from scipy.spatial import distance
from dtw import *
import matplotlib.pyplot as plt
%matplotlib inline
Write a function extract_mfccs() which returns MFCCs features as computed by librosa.features.mfcc, as well as the audio array. Use the default parameters. The function should also allow you to choose whether or not to return delta MFCCs to account for change over time. Use librosa.feature.delta for this.
def extract_mfccs(audio_path, delta=True):
"""Extreact MFCCs (librosa default parameters) from an audio file
Parameters
----------
audio_path : str
Path to input audio file
delta : bool
Whether or not to return delta-MFCCs
Returns
-------
mfccs : np.ndarray, shape=(n, num_mfccs)
MFCC feature set
audio : np.array, shape=(num_samples,)
audio for playback
fs : int, sample rate
"""
data, fs = librosa.load( audio_path )
mfcc = librosa.feature.mfcc( y = data, sr = fs )
if delta:
mfcc_delta = librosa.feature.delta( mfcc )
return mfcc, mfcc_delta, data, fs
else:
return mfcc , data, fs
Create a function my_dtw() which takes two time series (MFCCs) and outputs a DTW matrix.
scipy.spatial.distance.euclidean to calculate the distances.def my_dtw(query, reference):
"""Compute a DTW matrix
Parameters
----------
query : np.array, shape = (num_features, query_len)
query timeseries
reference : np.array, shape = (num_features, reference_len)
reference timeseries
Returns
-------
dtw_matrix : np.ndarray, shape=(query_len, referene_len)
dtw matrix where each axis is the lengths of query/refernce
"""
# Get shapes
query_len = query.shape[1]
reference_len = reference.shape[1]
# Create placeholder initialized to infinity
dtw_matrix = np.full((query_len , reference_len), np.inf)
# Set starting point to 0
dtw_matrix[0,0] = 0
# Compare previous three grid's cost and update
# the current grid cost with the minimum cost from before.
# Excluding the first row and column.
for i in range( 1, query_len ):
for j in range( 1, reference_len ):
cost = distance.euclidean( query[:, i] , reference[:, j] )
dtw_matrix[i, j] = cost + min( dtw_matrix[ i , j-1 ],
dtw_matrix[ i-1, j ],
dtw_matrix[ i-1, j-1 ] )
return dtw_matrix
Create a function get_warp_path() where the input is a DTW matrix and the output is a matrix with the same dimensions as the input, but where each cell is a 0 except for a cell in the warp path, which is a 1. Remember that the warp path the the least cost (smallest number path) between $D[0,0]$ and $D[n,m]$. Remember that the warp path must follow a path from the beginning to the end; so using argmin on an entire column may not work.
Hint: You may get better results if you work your way back from the end of the matrix (end of both timeseries), following the path with the least cost backwards.
def get_warp_path(dtw_matrix):
"""Get the warp path (best relationship between query and reference) from the DTW matrix
Parameters
----------
dtw_matrix : np.ndarray, shape=(query_len, reference_len)
dtw matrix where each axis is the lengths of query and refernce
Returns
-------
warp : np.ndarray, shape=(query_len, reference_len)
warp path matrix where all elements are 0 except the warp path, which is 1
"""
# Unpacking dimensions
( num_rows, num_cols ) = dtw_matrix.shape
i = num_rows - 1
j = num_cols - 1
# Going from end to start, set current grid to 1,
# find the smallest value from the grid that's
# above, to the left, and across the current grid.
# Set irrelavent values to zero, and move to new grid
while i > 0 and j > 0:
dtw_matrix[i, j] = 1
# Create a candidate array that contains three directions:
# Left, Across, Up
candidate = np.array( [dtw_matrix[i, j-1],
dtw_matrix[i-1, j-1],
dtw_matrix[i-1, j]] )
direction = np.argmin( candidate )
# Going left
if direction == 0:
dtw_matrix[ i: , j-1 ] = 0
j -= 1
# Going across
if direction == 1:
dtw_matrix[ i:, j-1 ] = 0
dtw_matrix[ i-1 , j: ] = 0
i -= 1
j -= 1
# Going up
if direction == 2:
dtw_matrix[ i-1 , j: ] = 0
i -= 1
# Setting the last (start) value to 1
dtw_matrix[i , j] = 1
return dtw_matrix
# I commented out the test1 and test2 because my_dtw function takes in 2 dimensional arrays.
# I can write a separate one that takes in 1d arrays, but I don't think it's neccessary for
# mfcc inputs which are 2-dimensional.
# test1 = np.array([1, 0, 1, 0, 1, 0, 1])
# test2 = np.array([1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1])
np.random.seed(0)
test1 = np.random.rand(3, 5)
test2 = np.random.rand(3, 3)
# USE THIS CODE TO TEST DTW WITH THE DTW-PYTHON PACKAGE
# test_dtw = dtw(test1, test2)
# test_dtw.plot()
# UNCOMMENT FOR YOUR TEST
a = my_dtw(test1, test2)
display(a)
warp = get_warp_path(a)
display( warp )
plt.imshow(warp,origin='bottom')
Plot the warp path (using either plot or imshow) from the DTW matrix:
Then, using the warp path matrix and the code below, create 2 IPython audio objects that begin playing at any given matching point for any two audio files. For example, if frame 100 in time series A is warped to frame 120 in time series B, your two audio players should start at those points. Try this with several different reference_position values to see how well the algorithm works. Also remember to convert frames back to samples before passing the audio to IPython.Audio.
# USE THIS CODE TO MATCH AN ARBITRARY REFERENCE POSITION TO THE QUERY POSITION
# warp = get_warp_path(dtw_matrix)
# reference_position = [reference frame number]
# query_position = np.argmax(warp[reference_position,:])
def plot( ref_audio_path, query_audio_path ):
# Get MFCC
mfcc_ref, mfcc_delta_ref, data_ref, fs_ref = extract_mfccs( ref_audio_path )
mfcc_query, mfcc_delta_query, data_query, fs_query = extract_mfccs( query_audio_path )
# GET dtw and path matrix
dtw = my_dtw( mfcc_query, mfcc_ref )
path = get_warp_path( dtw )
# Get time vectors for plotting
time_ref = len( data_ref ) / fs_ref
time_query = len( data_query ) / fs_query
time_vector_ref = np.linspace( 0, time_ref, path.shape[1])
time_vector_query = np.linspace( 0, time_query, path.shape[0])
# Plot path
plt.figure( figsize = ( 8, 8 ))
plt.xlabel( "Reference Track (seconds)")
plt.ylabel( "Query Track (seconds)")
plt.title( "Dynamic Time Warping")
plt.pcolormesh( time_vector_ref , time_vector_query, path )
plt.tight_layout()
plt.show()
return path, data_ref, data_query, fs_ref, fs_query
def plot_with_delta( ref_audio_path, query_audio_path ):
# Get MFCC
mfcc_ref, mfcc_delta_ref, data_ref, fs_ref = extract_mfccs( ref_audio_path )
mfcc_query, mfcc_delta_query, data_query, fs_query = extract_mfccs( query_audio_path )
# GET dtw and path matrix
dtw = my_dtw( mfcc_delta_query, mfcc_delta_ref )
path = get_warp_path( dtw )
# Get time vectors for plotting
time_ref = len( data_ref ) / fs_ref
time_query = len( data_query ) / fs_query
time_vector_ref = np.linspace( 0, time_ref, path.shape[1])
time_vector_query = np.linspace( 0, time_query, path.shape[0])
# Plot path
plt.figure( figsize = ( 8, 8 ))
plt.xlabel( "Reference Track (seconds)")
plt.ylabel( "Query Track (seconds)")
plt.title( "Dynamic Time Warping")
plt.pcolormesh( time_vector_ref , time_vector_query, path )
plt.tight_layout()
plt.show()
return path, data_ref, data_query, fs_ref, fs_query
def sonify( path, data_ref, data_query, fs_ref, fs_query , start_position):
"""
start_position: float between 0 to 1 (inclusive), to choose playback position
"""
# Convert ref start position to query start position
reference_position = int( path.shape[1] * start_position )
query_mfcc_frame = np.argmax( path[ : , reference_position ] )
query_position = query_mfcc_frame / path.shape[0]
query_position_sample_position = int( len(data_query) * query_position )
# Slice reference and query audio data using their corresponding start positions
reference_sample_position = int( len( data_ref ) * start_position )
data_ref_sliced = data_ref[ reference_sample_position: ]
audio_ref = ipd.Audio( data_ref_sliced, rate = fs_ref)
data_query_sliced = data_query[ query_position_sample_position: ]
audio_query = ipd.Audio( data_query_sliced, rate = fs_query )
# Play audio
print( "Playing reference audio at ", int( start_position * 100), '%' )
ipd.display(audio_ref)
print( "Playing query audio at ", int( start_position * 100), "% (warped)")
ipd.display(audio_query)
return
ref = "audio/audio1a.wav"
query = "audio/audio1b.wav"
path, data_ref, data_query, fs_ref, fs_query = plot( ref, query )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
Given that Dynamic Time Warping requires two time series, a reference and a query, develop a method for analyzing the success of the algorithm. How would you quantify results? What additional data (if any) would you need to be able to improve the accuracy of your metrics? If you use existing research, please cite your sources in your analysis.
A few things to consider:
Feel free to use your own music (two click tracks of different speeds may be helpful) and create plots as necessary in your analysis.
Let's first plot and sonify a few pairs of audio 2
ref = "audio/audio2a.wav"
query = "audio/audio2b.wav"
path, data_ref, data_query, fs_ref, fs_query = plot( ref, query )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
ref = "audio/audio2a.wav"
query = "audio/audio2c.wav"
path, data_ref, data_query, fs_ref, fs_query = plot( ref, query )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
From the comparison between the sound and graphs, it seems that the algorithm performs well on both cases, though it can be speculated that the difference between 2a and 2c is smaller than the difference between 2a and 2b. This conclusion is derived from both the graph, where the non-linearity is greater, and the recording.
Below, we see the comparison between two completely different tracks.
ref = "audio/voice.aif"
query = "audio/drums.aif"
path, data_ref, data_query, fs_ref, fs_query = plot( ref, query )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.5)
We see that the graph is completely non-linear, which is also reflected in the recording. This lead me to suspect to examine the linearity of the dtw_path as a measure of similarity between the two tracks, using the dtw algorithm
Before I go furture with that idea, let's see what using mfcc_delta do to the path matrix
ref = "audio/audio1a.wav"
query = "audio/audio1b.wav"
path, data_ref, data_query, fs_ref, fs_query = plot_with_delta( ref, query )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
It looks like the path becomes somewhat smoother, let's compare a few different click tracks to be sure. To be able to do that, I redefined some functions, so they take audio data array as input, rather than audio filepath
def extract_mfccs_array(audio, rate, delta=True):
"""Extreact MFCCs (librosa default parameters) from an audio file
Parameters
----------
audio : np.array
audio data
rate : int
sample rate of audio data
delta : bool
Whether or not to return delta-MFCCs
Returns
-------
mfccs : np.ndarray, shape=(n, num_mfccs)
MFCC feature set
audio : np.array, shape=(num_samples,)
audio for playback
fs : int, sample rate
"""
data = audio
fs = rate
mfcc = librosa.feature.mfcc( y = data, sr = fs )
if delta:
mfcc_delta = librosa.feature.delta( mfcc )
return mfcc, mfcc_delta, data, fs
else:
return mfcc , data, fs
def plot_with_array( ref_audio_data, query_audio_data, rate ):
# Get MFCC
mfcc_ref, mfcc_delta_ref, data_ref, fs_ref = extract_mfccs_array(ref_audio_data, rate, delta=True)
mfcc_query, mfcc_delta_query, data_query, fs_query = extract_mfccs_array(query_audio_data, rate, delta=True)
# GET dtw and path matrix
dtw = my_dtw( mfcc_query, mfcc_ref )
path = get_warp_path( dtw )
# Get time vectors for plotting
time_ref = len( data_ref ) / fs_ref
time_query = len( data_query ) / fs_query
time_vector_ref = np.linspace( 0, time_ref, path.shape[1])
time_vector_query = np.linspace( 0, time_query, path.shape[0])
# Plot path
plt.figure( figsize = ( 8, 8 ))
plt.xlabel( "Reference Track (seconds)")
plt.ylabel( "Query Track (seconds)")
plt.title( "Dynamic Time Warping")
plt.pcolormesh( time_vector_ref , time_vector_query, path )
plt.tight_layout()
plt.show()
return path, data_ref, data_query, fs_ref, fs_query
def plot_with_array_delta( ref_audio_data, query_audio_data, rate ):
# Get MFCC
mfcc_ref, mfcc_delta_ref, data_ref, fs_ref = extract_mfccs_array(ref_audio_data, rate, delta=True)
mfcc_query, mfcc_delta_query, data_query, fs_query = extract_mfccs_array(query_audio_data, rate, delta=True)
# GET dtw and path matrix
dtw = my_dtw( mfcc_delta_query, mfcc_delta_ref )
path = get_warp_path( dtw )
# Get time vectors for plotting
time_ref = len( data_ref ) / fs_ref
time_query = len( data_query ) / fs_query
time_vector_ref = np.linspace( 0, time_ref, path.shape[1])
time_vector_query = np.linspace( 0, time_query, path.shape[0])
# Plot path
plt.figure( figsize = ( 8, 8 ))
plt.xlabel( "Reference Track (seconds)")
plt.ylabel( "Query Track (seconds)")
plt.title( "Dynamic Time Warping")
plt.pcolormesh( time_vector_ref , time_vector_query, path )
plt.tight_layout()
plt.show()
return path, data_ref, data_query, fs_ref, fs_query
# Makes click track audio data arrays
def make_click( bpm, sample_rate, duration ):
y = np.zeros(sample_rate * duration)
for i in range (len(y)):
if i % ( np.floor( sample_rate / ( bpm / 60) ) ) == 0:
y[i] = 1
_, beat = librosa.beat.beat_track(y=y, start_bpm = bpm, sr=sample_rate)
y_beats = librosa.clicks(frames=beat, sr=sample_rate)
audio = ipd.Audio(y_beats, rate = sample_rate)
ipd.display( audio )
return y_beats, sample_rate
click1, rate = make_click( 200, 44100, 5 )
click2, rate = make_click( 100, 44100, 10 )
ref = click1
query = click2
path, data_ref, data_query, fs_ref, fs_query = plot_with_array( ref,query, rate = 41000 )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
path, data_ref, data_query, fs_ref, fs_query = plot_with_array_delta( ref,query, rate = 41000 )
sonify(path, data_ref, data_query, fs_ref, fs_query, 0.7)
It's a bit hard to see, but the path does seem to be slightly more smoothed out. Since the evaluation of the success of the algorithm is largely subjective, I think an objective way to quantify the differences between two tracks is some method that calculates the non-linearity of the dtw path graph that we generated.
Let's look at the two completely different file again, and try to measure the R2 Score against a linear line from start to finish
ref = "audio/voice.aif"
query = "audio/drums.aif"
path, data_ref, data_query, fs_ref, fs_query = plot( ref, query )
from sklearn.metrics import r2_score
path.shape
line = np.linspace( 0, path.shape[0], path.shape[1] )
predict = np.argmax( path, axis = 0 )
r2 = r2_score(line, predict)
plt.plot(predict, label = "predict")
plt.plot(line, label = "linear")
plt.legend()
print( "The R2 score between the dtw path and linear line is ", r2)
In contrast, let's compare the linearity of audio1a.wav and audio1b.wav, which is arguably more similar to each other, compared the difference between voice and drum.
ref = "audio/audio1a.wav"
query = "audio/audio1b.wav"
path, data_ref, data_query, fs_ref, fs_query = plot_with_delta( ref, query )
line = np.linspace( 0, path.shape[0], path.shape[1] )
predict = np.argmax( path, axis = 0 )
r2 = r2_score(line, predict)
plt.plot(predict, label = "predict")
plt.plot(line, label = "linear")
plt.legend()
print( "The R2 score between the dtw path and linear line is ", r2)
We see that this time we have a much more linear result.
In conclusion, I think we can use this linearity measure to evaluate how similar two tracks are towards each other. On the other hand, I think it'll need more subjective methods to evaluate the success of the DTW algorithm.